package com.atguigu.flink.sql.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/9/16

   类似hive中的窗口函数，不属于聚合操作。


 select
    xxx,
    窗口函数() over( partition by xxx order by xxx  窗口子句 )
 from xx

    窗口子句:
            rows | range between  上边界  and  下边界

 ---------------------------
    flink中:
        rows :  参考行号。
        range:  参考的是时间范围。
    下边界是固定的，只能是current row。不能是当前行的前几行。
 */
public class Demo5_OverWindow
{
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                env.setParallelism(1);
        
                SingleOutputStreamOperator<WaterSensor> ds = env
                    .socketTextStream("hadoop102", 8888)
                    .map(new WaterSensorMapFunction())
                    ;
        
                Schema schema = Schema.newBuilder()
                                      .column("id", "STRING")
                                      .column("ts", "BIGINT")
                                      .column("vc", "INT")
                                      .columnByExpression("pt", "PROCTIME()")
                                      .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts,3)")
                                      .watermark("et", "et - INTERVAL '0.001' SECOND")
                                      .build();
        
                
               tableEnv.createTemporaryView("t1",ds,schema);

               /*
                演示基于rows的 over窗口
                    The window can only be ordered in ASCENDING mode.
                    OVER windows' ordering in stream mode must be defined on a time attribute.
                    OVER RANGE FOLLOWING windows are not supported yet.
                        order by排序时，只能升序。
                        order by在流模式下，只能定义时间属性
                        窗口的范围的下界不能向后取

                        下边界只能是CURRENT ROW

                        ------------
                          如果是按照事件时间排序，那么只有等 水印 >= 事件时间，才会触发运算。

                          row只看行号，不看时间范围。同一个事件时间产生的数据，它们计算的window也不一样。

                */
        String rowsSql = " select " +
                            "  id,ts,vc, " +
                            // 按照id分组后，按照pt升序排序，取当前行及之前的所有行组成窗口的计算范围
                            //"  sum(vc) over(partition by id order by pt  rows between UNBOUNDED preceding  and CURRENT ROW )  sumVc" +
                             // 按照id分组后，按照pt升序排序，取当前行及前1行组成窗口的计算范围
                           // "  sum(vc) over(partition by id order by pt  rows between 1 preceding  and CURRENT ROW )  sumVc" +
                           //  按照id分组后，按照et升序排序，取当前行及前1行组成窗口的计算范围
                            "  sum(vc) over(partition by id order by et  rows between 1 preceding  and CURRENT ROW )  sumVc" +
                         "  from t1 ";



        /*
            range只看时间范围。同一个事件时间产生的数据，它们计算的window一样。
         */
        String rangeSql = " select " +
            "  id,ts,vc, " +
            // 按照id分组后，按照et升序排序，取当前数据的事件时间及之前的所有数据组成窗口的计算范围
            //"  sum(vc) over(partition by id order by et  range between UNBOUNDED preceding  and CURRENT ROW )  sumVc" +
            // 按照id分组后，按照et升序排序，取当前数据的事件时间及之前2s的所有数据组成窗口的计算范围
             "  sum(vc) over(partition by id order by et  range between INTERVAL '2' second preceding  and CURRENT ROW )  sumVc" +
            "  from t1 ";


        /*
            All aggregates must be computed on the same window.
                一个sql中，如果多次调用Over函数，保证所有的聚合，Over()定义是一样的。
         */
        String sql = " select " +
            "  id,ts,vc, " +
            "  sum(vc) over w sumVc ," +
            "  min(vc) over w  minVc ," +
            "  max(vc) over w  maxVc " +
            "  from t1 " +
            "  window w as (partition by id order by pt  rows between 1 preceding  and CURRENT ROW ) ";

        tableEnv.sqlQuery(sql).execute().print();

    }
}
